Cloud Composer で Dataplex のデータリネージ統合を試してみた
こんにちは!エノカワです。
Cloud Composer は、Apache Airflow で構築されたフルマネージドのワークフローオーケストレーションサービスです。
Apache Airflow のオープンソース プロジェクトを基に構築されており、Python プログラミング言語を使用して動作します。
試したこと
今回は、Cloud Composer で Dataplex のデータリネージ統合を試してみました。
下記のドキュメントに沿って実施してみましたので、その内容をご紹介します。
Data Lineage API を有効にする
まずは Dataplex のデータリネージ機能を利用するために、Data Lineage API を有効にします。
APIの有効化は Google Cloud コンソールの 「APIとサービス」 メニューから行います。
Data Lineage API を有効にすると、Dataplex は BigQuery、Cloud Data Fusion、Dataproc のデータの取り込みを自動的に開始します。
環境作成
次に、Cloud Composer 環境を作成します。
Google Cloud コンソールの「Cloud Composer の環境作成」ページから、test-composer
という名前の環境を、東京リージョンで作成しました。
サービスアカウントなどその他の設定はデフォルトのままで作成しました。
Cloud Composer 環境でデータリネージ統合を有効にする
Cloud Composer 環境でデータリネージ統合を有効にするには、Composer の環境設定を開き、必要なオプションを設定します。
環境構成の 「Dataplex データリネージ統合」 を編集します。
「Dataplex データリネージとの統合を有効にする」 を選択して保存します。
環境の更新が行われ、しばらくすると環境構成の 「Dataplex データリネージ統合」 が有効になりました。
DAG を作成する
Cloud Composer 環境でデータリネージ統合を有効にして、サポートされているオペレーター を利用する DAG を実行すると、Cloud Composer が Data Lineage API にリネージ情報を報告します。
2024年9月時点でサポートされているオペレーターは下記のとおりです。
airflow.providers.google.cloud.operators.bigquery.BigQueryExecuteQueryOperator
airflow.providers.google.cloud.operators.bigquery.BigQueryInsertJobOperator
airflow.providers.google.cloud.transfers.bigquery_to_bigquery.BigQueryToBigQueryOperator
airflow.contrib.operators.bigquery_to_gcs.BigQueryToCloudStorageOperator
airflow.providers.google.cloud.transfers.bigquery_to_gcs.BigQueryToGCSOperator
airflow.providers.google.cloud.transfers.gcs_to_bigquery.GCSToBigQueryOperator
airflow.contrib.operators.gcs_to_bq.GoogleCloudStorageToBigQueryOperator
airflow.providers.google.cloud.operators.dataproc.DataprocSubmitJobOperator
今後も追加されていくと思われますので最新のサポート状況は下記ドキュメントを参照ください。
今回はGCSToBigQueryOperator
を使用してみました。
以下は、Google Cloud Storage (GCS) にあるファイルを BigQuery に取り込むための Airflow DAG の例です。
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
PROJECT_ID = '{プロジェクトID}'
DATASET_ID = 'work'
TABLE_ID = 'sales_data'
BUCKET_NAME = 'cm_enokawa_work'
# DAGの基本設定
default_args = {
'start_date': days_ago(1),
}
with DAG(
dag_id='gcs_to_bigquery_lineage_dag',
default_args=default_args,
schedule_interval=None, # 手動で実行する
catchup=False
) as dag:
# GCS から BigQuery へのデータ取り込み
gcs_to_bq = GCSToBigQueryOperator(
task_id='load_gcs_to_bq',
bucket=BUCKET_NAME, # GCS バケット名
source_objects=['sales.csv'], # GCS 内のファイルパス
destination_project_dataset_table=f'{PROJECT_ID}.{DATASET_ID}.{TABLE_ID}', # BigQuery のデータセットとテーブル
source_format='CSV', # ソースファイル形式
write_disposition='WRITE_TRUNCATE', # 既存データの上書き
skip_leading_rows=1 # CSV のヘッダー行をスキップ
)
gcs_to_bq
この DAG では、Google Cloud Storage に保存された CSV ファイル (sales.csv
) を BigQuery のテーブル(sales_data
)にインポートしています。
GCSToBigQueryOperator
によるリネージの追跡は自動的に行われるため、特に追加の操作は必要ありません。
DAG を実行する
Cloud Composer の DAG 実行画面から先ほど作成した DAG を手動実行します。
DAG の実行が完了し、load_gcs_tobq
タスクが成功したことを確認しました。
これでデータが GCS から BigQuery に取り込まれました。
Dataplex の UI 上でリネージグラフを確認してみましょう。
リネージグラフ を確認する
Dataplex の UI からデータ取り込み先である BigQuery テーブル sales_data
を検索します。
sales_data
を選択して 「リネージ」 を表示します。
リネージグラフは以下のように表示され、GCS(sales.csv
) から BigQuery(sales_data
) へのデータの流れが視覚的に確認できました。
GCS から BigQuery へのフロー上にある Cloud Composer のアイコンをクリックすると、環境名や DAG ID や Task ID などの情報を確認することができます。
また、「COMPOSER で環境を開く」 や 「COMPSER で DAG を開く」 から各画面を開くこともできます。
まとめ
以上、Cloud Composer で Dataplex を使ってデータリネージの統合を試してみました。
Dataplex のデータリネージ統合を有効にすることにより、データフローの可視化や追跡が非常に簡単にできました!
特に、サポートされているオペレーターを利用すれば、自動的にリネージ情報がキャプチャされるため、複雑なデータパイプラインの全体像を把握するのに有力な機能ではないでしょうか。
今回は自動リネージレポートでサポートされているオペレーターGCSToBigQueryOperator
を使用しました。
サポートされていないオペレーターでも カスタム リネージ イベント を送信することでリネージを報告することが可能のようですので、次回試してみたいと思います。